HEX
Server: LiteSpeed
System: Linux eticaretsrv4.isimtescil.net 3.10.0-962.3.2.lve1.5.26.7.el7.x86_64 #1 SMP Wed Oct 2 07:53:12 EDT 2019 x86_64
User: sioberen (1086)
PHP: 7.3.33
Disabled: NONE
Upload Files
File: //opt/alt/python37/lib/python3.7/site-packages/clcommon/utils.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import io
import re
import os
import sys
import grp
import subprocess
from typing import Dict  # NOQA
import platform

from xml.dom import minidom, DOMException
from xml.parsers.expat import ExpatError
from configparser import ConfigParser, Error

import secureio

RHN_SYSTEMID_FILE = '/etc/sysconfig/rhn/systemid'


class ExternalProgramFailed(Exception):
    def __init__(self, message):
        Exception.__init__(self, message)


def create_symlink(link_value, link_path):
    """
    Create symlink link_path -> link_value if it does not exist or
    points to different location
    :param link_value: path that symlink should point to (symlink value)
    :type link_value: str
    :param link_path: path where to create symlink
    :type link_path: str
    """
    link_to = None
    if os.path.islink(link_path):
        try:
            link_to = os.readlink(link_path)
        except OSError:
            pass
    if link_value != link_to:
        try:
            os.unlink(link_path)
        except OSError:
            pass
        os.symlink(link_value, link_path)


def get_file_lines(path):
    """
    Read file and return file's lines
    :param path: path to file
    :return: list of file's lines
    """

    content = []
    if os.path.isfile(path):
        with open(path, 'r') as f:
            content = f.readlines()
    return content


def write_file_lines(path, content, mode):
    """
    Write lines to file
    :param content: list of lines for writing to file
    :param path: path to file
    :param mode: open mode
    :return: None
    """

    with open(path, mode) as f:
        f.writelines(content)


def check_command(cmdname):
    """
    Checks if command is present and exits if no
    """
    if not os.path.exists(cmdname):
        print('No such command (%s)' % (cmdname,))
        sys.exit(1)


def run_command(cmd, env_data=None, return_full_output=False, std_in=None, convert_to_str=True):
    """
    Runs external process and returns output
    :param cmd: command and arguments as a list
    :param env_data: environment data for process
    :param return_full_output: if true, returns (ret_code, std_out, std_err)
    @return: process stdout if is_full_output==False
    else - cortege (ret_code, std_out, std_err) without any checking
    """
    cmd_line = ' '.join(cmd)
    try:
        if return_full_output:
            std_err_obj = subprocess.PIPE
        else:
            std_err_obj = subprocess.STDOUT
        if not std_in:
            stdin_arg = open('/dev/null')
        else:
            stdin_arg = subprocess.PIPE
        output = subprocess.Popen(
            cmd,
            stdin=stdin_arg,
            stdout=subprocess.PIPE,
            stderr=std_err_obj,
            close_fds=True,
            env=env_data,
            text=convert_to_str)
    except OSError as oserr:
        raise ExternalProgramFailed('%s. Can not run command: %s' % (cmd_line, str(oserr)))
    if not std_in:
        std_out, std_err = output.communicate()
    else:
        std_out, std_err = output.communicate(std_in)
    if return_full_output:
        return output.returncode, std_out, std_err
    if output.returncode != 0:
        if not convert_to_str:
            raise ExternalProgramFailed('Error during command: %s' % cmd_line)
        else:
            raise ExternalProgramFailed(std_err or 'output of the command: %s\n%s' % (cmd_line, std_out))
    return std_out


def exec_utility(util_path, params):
    """
    Executes supplied utility with supplied parameters
    :param util_path: Executable file to run path
    :param params: utility parameters
    :return: Cortege (ret_code, utility_stdout)
    """
    args = list()
    args.append(util_path)
    args.extend(params)
    process = subprocess.Popen(args, stdout=subprocess.PIPE,
                               text=True)
    stdout, _ = process.communicate()
    retcode = process.returncode
    return retcode, stdout.strip()


def delete_line_from_file(path, line):
    """
    Delete line from file. Return True when line(s) have been deleted, False otherwise (specified line is not found)
    :param path: path to file
    :type path: string
    :param line: line to delete without EOL ('\n')
    :type line: string
    :rtype bool
    """
    file_lines = get_file_lines(path)
    out_file_lines = [item for item in file_lines if line != item.rstrip('\n')]
    found = len(file_lines) != len(out_file_lines)
    write_file_lines(path, out_file_lines, 'w+')

    return found


def is_root_or_exit():
    """
    Check whether current user is effectively root and exit if not
    """
    euid = os.geteuid()
    if euid != 0:
        try:
            # Additional admins placed in this special group
            # by lvemanager hooks to add root-like privileges to them
            gr_gid = grp.getgrnam('clsupergid').gr_gid
            if gr_gid in os.getgroups() or os.getegid() == gr_gid:
                return
        except KeyError:
            pass    # No group - no privileges
        print('Error: root privileges required. Abort.', file=sys.stderr)
        sys.exit(-1)


def is_ea4():
    """
    Detects is EA4 installed
    :return: True - EA4 present; False - EA4 absent
    """
    return os.path.isfile('/etc/cpanel/ea4/is_ea4')


def grep(pattern, path=None, fixed_string=False, match_any_position=True, multiple_search=False, data_from_file=None):
    """
    Grep pattern in file
    :param multiple_search: if True - search all match,
                            False - search first match
    :param pattern: pattern for search
    :param path: path to file
    :param data_from_file: read data from file for parsing
    :param fixed_string: if True - search only fixed string,
           False - search by regexp
    :param match_any_position: if True - search any match position,
           False - search only from string begin
    :return: Generator with matched strings
    """
    if data_from_file is None:
        data_from_file = get_file_lines(path)
    result = None
    if not fixed_string:
        # It's append the symbol ^ to the regexp
        # if we are searching from the begin of a string and by the regexp
        if not pattern.startswith('^') and not match_any_position:
            pattern = '^{}'.format(pattern)
        pattern_comp = re.compile(pattern)
    else:
        pattern_comp = None
    for line in data_from_file:
        if fixed_string:
            if match_any_position and line.find(pattern) != -1:
                result = line
            elif line.startswith(pattern):
                result = line
        else:
            if pattern_comp.search(line):
                result = line
        if multiple_search and result is not None:
            yield result
        elif result is not None:
            break
        result = None
    if result is not None:
        yield result


def _parse_systemid_file():
    """
    :rtype: xml.dom.minidom.Document
    """
    return minidom.parse(RHN_SYSTEMID_FILE)


def get_rhn_systemid_value(name):
    """
        find a member in xml by name and return value
    :type name: str
    :rtype: str|None
    """
    try:
        rhn_systemid_xml = _parse_systemid_file()
        for member in rhn_systemid_xml.getElementsByTagName('member'):
            if member.getElementsByTagName("name")[0].firstChild.nodeValue == name:
                return member.getElementsByTagName("value")[0].firstChild.firstChild.nodeValue
    except (IOError, IndexError, KeyError, ExpatError, DOMException):
        return None
    return None


def get_file_system_in_which_file_is_stored_on(file_path):
    # type: (str) -> Dict[str, str]
    """
    This function is written for detect file system in which file is stored on.
    E.g., the file can be stored in NFS and this can affect the normal operation of the file.
    We want to receive information about FS in emergency situations during reading or writing
    :param file_path: path to file, for which we want to detect file system
    :return: dict, which contains two keys:
            key 'success' can be equals to False if we got error or True if we got normal result
            key 'details' can contais error string if key 'success' is False or result if key 'success' is True
    """
    result = {
        'success': False,
        'details': 'File "%s" isn\'t exists' % file_path,
    }
    if not os.path.exists(file_path):
        return result
    # Command:  mount | grep "on $(df <file_name> | tail -n 1 | awk '{print $NF}') type"
    # Result:   /usr/tmpDSK on /var/tmp type ext3 (rw,nosuid,noexec,relatime,data=ordered)
    try:
        mount_point = subprocess.check_output(
            ["df %s | tail -n 1 | awk '{print $NF}'" % file_path],
            shell=True,
            text=True
        ).strip()
        data = subprocess.check_output(
            ['mount | grep "on %s type"' % mount_point],
            shell=True,
            text=True
        ).strip()
        result['success'] = True
        result['details'] = data
    except (subprocess.CalledProcessError, OSError) as err:
        data = 'We can\'t get file system for file "%s". Exception "%s"' % (
            file_path,
            err,
        )
        result['details'] = data

    return result


def is_testing_enabled_repo():
    """
    Checks if testing is enabled in /etc/yum.repos.d/cloudlinux.repo config
    :return: bool value if testing enabled or not
    """
    parser = ConfigParser(interpolation=None, strict=False)
    try:
        parser.read(open('/etc/yum.repos.d/cloudlinux.repo'))
        res = parser.getboolean('cloudlinux-updates-testing', 'enabled')
    # use base exception for config parser class
    except Error:
        res = False
    return res


def get_cl_version():
    """
    Returns cl version taking into account release version
    E.g: release = 2.6.32-896.16.1.lve1.4.54.el6.x86_64
    el6 = cl6
    el8 = cl8
    ........
    :return appropriate version string
    """
    check_vals_decoder = {'el6.': 'cl6',
                          'el6h.': 'cl6h',
                          'el7.': 'cl7',
                          'el7h.': 'cl7h',
                          'el8.': 'cl8'}
    release = platform.release()
    for check_val in check_vals_decoder:
        if check_val in release:
            return check_vals_decoder[check_val]
    release = get_rhn_systemid_value("os_release")
    if '6' in release:
        return 'cl6'
    elif '7' in release:
        return 'cl7'
    elif '8' in release:
        return 'cl8'
    return None


def is_litespeed_running():
    """
    Detects that server works under Litespeed
    """
    pid_file_path = '/tmp/lshttpd/lshttpd.pid'
    htpasswd_path = '/usr/local/lsws/admin/htpasswds/status'
    return os.path.isfile(pid_file_path) and os.path.isfile(htpasswd_path)


def get_passenger_package_name():
    """
    Return proper passenger package according to apache version
    :rtype: str
    """
    if is_ea4():
        return 'ea-apache24-mod-alt-passenger'
    return 'alt-mod-passenger'


def is_package_installed(package_name):
    """
    Checks that package installed on server
    :param package_name: str
    :rtype: bool
    """
    try:
        run_command(['rpm', '-q', package_name])
    except ExternalProgramFailed:
        return False
    return True


def get_rpm_db_errors():
    """
    Check RPM DB as described in https://access.redhat.com/articles/3763
    :return: None - No RPM DB errors
            string_message - Error description
    """
    doc_link = 'https://cloudlinux.zendesk.com/hc/en-us/articles/115004075294-Fix-rpmdb-Thread-died-in-Berkeley-DB-library'
    try:
        # cd /var/lib/rpm
        # /usr/lib/rpm/rpmdb_verify Packages
        # If check finished with error,
        #  the rpmdb_verify utility returns 1 and prints error messages both to stdout and stderr
        prc = subprocess.Popen(['/usr/lib/rpm/rpmdb_verify', 'Packages'], shell=False, cwd="/var/lib/rpm",
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        std_out, std_err = prc.communicate()
        if prc.returncode != 0:
            # Check error
            return 'RPM DB check error: %s\n%s. See doc: %s' % (std_out, std_err, doc_link)
    except (OSError, IOError, ) as e:
        return str(e)
    # There is no RPM DB errors
    return None


def silence_stdout_until_process_exit():
    """
    Upon process exit, Sentry sometimes prints:

        Sentry is attempting to send 1 pending error messages
        Waiting up to 10 seconds
        Press Ctrl-C to quit

    This causes broken JSON in output.
    See also this issue: https://github.com/getsentry/raven-python/issues/904
    """
    sys.stdout = io.StringIO()
    sys.stderr = io.StringIO()


def mod_makedirs(path, mod):
    """
    Create directories with desired permissions

    Changed in version 3.7: The mode argument no longer affects
    the file permission bits of newly-created intermediate-level directories.
    because it we use umask while creating dirs

    :param mod: desired permissions
    """
    inverted_mod = 0o777 - (mod & 0o777)
    with secureio.set_umask(inverted_mod):
        os.makedirs(path, mod)